package com.thrift.zookeeper.server;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.Executors;

import org.apache.log4j.Logger;
import org.apache.thrift.TMultiplexedProcessor;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.StringUtils;

import com.alibaba.fastjson.JSONObject;

/**
 * @author sft
 *
 *         2016年5月23日
 */
public class ZKServer implements InitializingBean {
	private Logger log = Logger.getLogger(ZKServer.class);
	/**
	 * zk连接字符串
	 */
	private String connectionStr;
	/**
	 * thrift服务所在主机的ip
	 */
	private String hostName;
	/**
	 * thrift要暴露的端口
	 */
	private int port;
	/**
	 * zk保存此服务的父节点，可多层
	 */
	private String pathNode;
	/**
	 * 节点名称
	 */
	private String nodeName;
	/**
	 * 服务实现类，多个用逗号隔开
	 */
	private String ifaceImpls;

	/**
	 * 服务的名称，多个用逗号隔开，数量和顺序要对应服务实现类的数量和顺序
	 */
	private String seviceNames;
	/**
	 * zk能否启动,默认为false,thrift服务发布后，更改为true
	 */
	private boolean canZkStart = false;

	/**
	 * @return the connectionStr
	 */
	public String getConnectionStr() {
		return connectionStr;
	}

	/**
	 * @param connectionStr
	 *            the connectionStr to set
	 */
	public void setConnectionStr(String connectionStr) {
		this.connectionStr = connectionStr;
	}

	/**
	 * @return the nodeName
	 */
	public String getNodeName() {
		return nodeName;
	}

	/**
	 * @param nodeName
	 *            the nodeName to set
	 */
	public void setNodeName(String nodeName) {
		this.nodeName = nodeName;
	}

	/**
	 * @return the hostName
	 */
	public String getHostName() {
		return hostName;
	}

	/**
	 * @param hostName
	 *            the hostName to set
	 */
	public void setHostName(String hostName) {
		this.hostName = hostName;
	}

	/**
	 * @return the port
	 */
	public int getPort() {
		return port;
	}

	/**
	 * @param port
	 *            the port to set
	 */
	public void setPort(int port) {
		this.port = port;
	}

	/**
	 * @return the ifaceImpls
	 */
	public String getIfaceImpls() {
		return ifaceImpls;
	}

	/**
	 * @param ifaceImpls
	 *            the ifaceImpls to set
	 */
	public void setIfaceImpls(String ifaceImpls) {
		this.ifaceImpls = ifaceImpls;
	}

	/**
	 * @return the seviceNames
	 */
	public String getSeviceNames() {
		return seviceNames;
	}

	/**
	 * @param seviceNames
	 *            the seviceNames to set
	 */
	public void setSeviceNames(String seviceNames) {
		this.seviceNames = seviceNames;
	}

	/**
	 * @return the pathNode
	 */
	public String getPathNode() {
		return pathNode;
	}

	/**
	 * @param pathNode
	 *            the pathNode to set
	 */
	public void setPathNode(String pathNode) {
		this.pathNode = pathNode;
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
	 */
	@Override
	public void afterPropertiesSet() throws Exception {
		thriftPublish();
		while (!canZkStart) {

		}
		new Thread(){
			@Override
			public void run() {
				
				try {
					zkStart();
				} catch (KeeperException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			};
		}.start();

	}

	public ZKServer() {
		super();
		// TODO Auto-generated constructor stub
	}

	/**
	 * zk启动
	 * 
	 * @throws InterruptedException
	 * @throws KeeperException
	 * @throws IOException
	 */
	public void zkStart() throws KeeperException, InterruptedException, IOException {
		log.info("===========开始连接zookeeper============");
		{
			// TODO Auto-generated method stub
			try {

				MyWatch watcher = new MyWatch();
				ZooKeeper zk = new ZooKeeper(connectionStr, 120000, watcher);
				// 创建服务的父节点
				String[] pathArr = pathNode.split("/");
				StringBuffer node = new StringBuffer();
				for (int i = 1; i < pathArr.length; i++) {
					log.info("名称：" + pathArr[i]);
					node.append("/");
					node.append(pathArr[i]);
					Stat path = null;
					try {
						path = zk.exists(node.toString(), watcher);
						if (path == null) {
							zk.create(node.toString(), "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
						}

					} catch (KeeperException | InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
						log.info("=========连接zookeeper异常，程序退出===========");
						System.exit(-1);
					}
				}
				// 添加服务
				//String serviceName = "/testThriftMuti";// 调用的服务名称
				JSONObject nodeJson = new JSONObject();
				nodeJson.put("ip", hostName);
				nodeJson.put("port", port);
				zk.create(pathNode +"/"+ nodeName, nodeJson.toString().getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (KeeperException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} finally {

			}

		}

	}

	/**
	 * thrift 发布服务
	 * 
	 * @throws ThriftExcption
	 */
	public void thriftPublish() throws ThriftExcption {
		if (StringUtils.isEmpty(ifaceImpls)) {
			throw new ThriftExcption("服务实现类至少需要一个");

		}
		if (StringUtils.isEmpty(seviceNames)) {
			throw new ThriftExcption("需要配置服务实现类对应的服务名称");
		}
		final String[] ifaces = ifaceImpls.split(",");
		final String[] servicesNames = seviceNames.split(",");
		if (ifaces.length != servicesNames.length) {
			throw new ThriftExcption("服务实现类与服务名称数量不同");
		}
		// 开始做thrift服务发布,因为发布服务会阻塞程序执行，所以开启新线程来发布服务
		new Thread() {
			/*
			 * (non-Javadoc)
			 * 
			 * @see java.lang.Thread#run()
			 */
			@Override
			public void run() {
				TMultiplexedProcessor processor = new TMultiplexedProcessor();
				for (int i = 0; i < ifaces.length; i++) {
					String iface = ifaces[i];
					String serviceName = servicesNames[i];
					try {
						Class<?> ifaceImplClass = Class.forName(iface);// 加载服务实现类
						if (ifaceImplClass.getInterfaces() == null || ifaceImplClass.getInterfaces().length == 0) {
							throw new ThriftExcption("服务实现类必须实现对应的Iface接口");
						} else {
							Class<?> ifaceInterface = ifaceImplClass.getInterfaces()[0];
							if (!ifaceInterface.getSimpleName().equalsIgnoreCase("iface")) {
								throw new ThriftExcption("服务实现类必须实现对应的Iface接口");
							}
							Class<?> processorClass = null;
							for (Class<?> innerClass : ifaceInterface.getDeclaringClass().getDeclaredClasses()) {
								if (innerClass.getSimpleName().equalsIgnoreCase("Processor")) {
									processorClass = innerClass;
									break;
								}
							}
							if (processorClass == null) {
								throw new ThriftExcption("接口规范文件缺少Processor类的定义");
							}
							/*
							 * log.info("实现的接口："+ifaceInterface);
							 * for(Constructor<?>
							 * con:processorClass.getConstructors()){
							 * log.info("构造方法："+con);
							 * 
							 * }
							 */
							Constructor<?> cons = processorClass.getConstructor(ifaceInterface);
							processor.registerProcessor(serviceName, (TProcessor) cons.newInstance(ifaceImplClass.newInstance()));

						}
					} catch (ClassNotFoundException e) {
						e.printStackTrace();
						System.exit(-1);
					} catch (ThriftExcption e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
						System.exit(-1);
					} catch (NoSuchMethodException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (SecurityException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (InstantiationException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (IllegalAccessException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (IllegalArgumentException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (InvocationTargetException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}

				}
				TNonblockingServerSocket tnbSocketTransport = null;
				try {
					tnbSocketTransport = new TNonblockingServerSocket(port);
				} catch (TTransportException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				THsHaServer.Args tnbArgs = new THsHaServer.Args(tnbSocketTransport);
				tnbArgs.processor(processor);
				tnbArgs.transportFactory(new TFramedTransport.Factory());
				tnbArgs.protocolFactory(new TCompactProtocol.Factory());
				tnbArgs.executorService(Executors.newCachedThreadPool());

				// 使用非阻塞式IO，服务端和客户端需要指定TFramedTransport数据传输的方式
				TServer server = new THsHaServer(tnbArgs);
				server.setServerEventHandler(new StartServerEventHandler());
				server.serve();

			}

		}.start();

	}

	class StartServerEventHandler implements TServerEventHandler {
		@Override
		public void preServe() {
			/*
			 * 需要实现这个方法，以便在服务启动成功后， 通知mainProcessor： “Apache Thrift已经成功启动了”
			 */
			canZkStart = true;
			synchronized (ZKServer.this) {
				ZKServer.this.notify();
			}

		}

		/*
		 * (non-Javadoc)
		 * 
		 * @see
		 * org.apache.thrift.server.TServerEventHandler#createContext(org.apache
		 * .thrift.protocol.TProtocol, org.apache.thrift.protocol.TProtocol)
		 */
		@Override
		public ServerContext createContext(TProtocol input, TProtocol output) {
			/*
			 * 无需实现
			 */
			return null;
		}

		@Override
		public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
			/*
			 * 无需实现
			 */
		}

		@Override
		public void processContext(ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {
			/*
			 * 无需实现
			 */
		}
	}

	private class MyWatch implements Watcher {

		/*
		 * (non-Javadoc)
		 * 
		 * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.
		 * WatchedEvent)
		 */
		@Override
		public void process(WatchedEvent event) {
			// TODO Auto-generated method stub

		}

	}
}
